package com.xiaomaoguai.fcp.pre.kepler.router.async;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.xiaomaoguai.fcp.pre.kepler.router.buf.Buf;
import com.xiaomaoguai.fcp.pre.kepler.router.handler.api.HandlerContext;
import com.xiaomaoguai.fcp.pre.kepler.router.handler.enums.Instruction;
import com.xiaomaoguai.fcp.pre.kepler.router.router.RouterPipeline;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.RunnableWrapper;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public class StandardWorkPool implements WorkPool<HandlerContext<?>> {

	// 线程数
	private final Integer workThreadNum;

	private final static String THREAD_NAME_FORMAT = "Router-StandardWorkPool-processor-%d";

	private final AtomicBoolean started = new AtomicBoolean(false);

	private ThreadPoolExecutor threadPoolExecutor;

	public StandardWorkPool(Integer workThreadNum) {
		this.workThreadNum = (workThreadNum == null ? Runtime.getRuntime().availableProcessors() + 1 : workThreadNum);
	}

	public StandardWorkPool() {
		this(Runtime.getRuntime().availableProcessors() + 1);
	}

	public class StandardRunnable implements Runnable {

		private HandlerContext<?> hc;

		public StandardRunnable(HandlerContext<?> hc) {
			this.hc = hc;
		}

		@Override
		public void run() {
			Buf buf = hc.getBuf();
			String bufId = buf.getId();
			log.info("bufId:{} {} 开始异步执行", bufId, hc.getRouter().getRouterName());
			// 除callback以外都是pipe
			// SinglePipe是为了保证buf线程安全的产物
			// 异步调用时直接invokeCurrentHandler，防止再次创建异步任务
			if (hc instanceof RouterPipeline) {
				RouterPipeline pipe = (RouterPipeline) hc;
				if (pipe.isSinglePipe()) {
					hc = pipe.getNext();
				} else {
					pipe.invokeHandler();
					return;
				}
			}
			// callback时需执行当前handler后，继续执行后续handler
			Instruction instruction = null;
			try {
				instruction = hc.syncHandler();
				if (instruction == Instruction.BREAK) {
					return;
				}
			} catch (Throwable cause) {
				hc.exceptionHandler(cause);
				return;
			}
			if (instruction == Instruction.EXIT) {
				hc.getLast().invokeHandler();
				return;
			}
			hc.getNext().invokeHandler();
		}

	}

	@Override
	public void start() {
		if (!started.compareAndSet(false, true)) {
			throw new IllegalStateException(
					"WorkerPool has already been started and cannot be restarted until halted.");
		}

		ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
		threadFactoryBuilder.setNameFormat(THREAD_NAME_FORMAT);

		this.threadPoolExecutor = new ThreadPoolExecutor(
				this.workThreadNum,
				64,
				1,
				TimeUnit.MINUTES,
				new LinkedBlockingQueue<>(2000),
				threadFactoryBuilder.build(),
				new ThreadPoolExecutor.AbortPolicy());
	}

	@Override
	public void shutdownGracefully() {
		threadPoolExecutor.shutdown();
		started.set(false);
	}

	@Override
	public void shutdown() {
		threadPoolExecutor.shutdownNow();
		started.set(false);
	}

	@Override
	public boolean isRunning() {
		return started.get();
	}

	@Override
	public void addTask(HandlerContext<?> hc) {
		threadPoolExecutor.execute(RunnableWrapper.of(new StandardRunnable(hc)));
	}

}
